home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-desktop-9.10-i386-PL.iso / casper / filesystem.squashfs / usr / share / pyshared / checkbox / message.py < prev    next >
Text File  |  2009-11-05  |  10KB  |  281 lines

  1. #
  2. # This file is part of Checkbox.
  3. #
  4. # Copyright 2008 Canonical Ltd.
  5. #
  6. # Checkbox is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # Checkbox is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with Checkbox.  If not, see <http://www.gnu.org/licenses/>.
  18. #
  19. import os
  20. import logging
  21. import itertools
  22. import posixpath
  23.  
  24. from checkbox.contrib import bpickle
  25.  
  26.  
  27. HELD = "h"
  28. BROKEN = "b"
  29.  
  30. ANCIENT = 1
  31.  
  32.  
  33. class MessageStore(object):
  34.     """A message store which stores its messages in a file system hierarchy."""
  35.  
  36.     def __init__(self, persist, directory, directory_size=1000):
  37.         self._directory = directory
  38.         self._directory_size = directory_size
  39.         self._original_persist = persist
  40.         self._persist = persist.root_at("message-store")
  41.         message_dir = self._message_dir()
  42.         if not posixpath.isdir(message_dir):
  43.             os.makedirs(message_dir)
  44.  
  45.     def commit(self):
  46.         """Save metadata to disk."""
  47.         self._original_persist.save()
  48.  
  49.     def get_sequence(self):
  50.         """
  51.         Get the sequence number of the message that the server expects us to
  52.         send on the next exchange.
  53.         """
  54.         return self._persist.get("sequence", 0)
  55.  
  56.     def set_sequence(self, number):
  57.         """
  58.         Set the sequence number of the message that the server expects us to
  59.         send on the next exchange.
  60.         """
  61.         self._persist.set("sequence", number)
  62.  
  63.     def get_pending_offset(self):
  64.         return self._persist.get("pending_offset", 0)
  65.  
  66.     def set_pending_offset(self, val):
  67.         """
  68.         Set the offset into the message pool to consider assigned to the
  69.         current sequence number as returned by l{get_sequence}.
  70.         """
  71.         self._persist.set("pending_offset", val)
  72.  
  73.     def add_pending_offset(self, val):
  74.         self.set_pending_offset(self.get_pending_offset() + val)
  75.  
  76.     def count_pending_messages(self):
  77.         """Return the number of pending messages."""
  78.         return sum(1 for x in self._walk_pending_messages())
  79.  
  80.     def get_pending_messages(self, max=None):
  81.         """Get any pending messages that aren't being held, up to max."""
  82.         messages = []
  83.         for filename in self._walk_pending_messages():
  84.             if max is not None and len(messages) >= max:
  85.                 break
  86.             data = self._get_content(filename)
  87.             try:
  88.                 message = bpickle.loads(data)
  89.             except ValueError, e:
  90.                 logging.exception(e)
  91.                 self._add_flags(filename, BROKEN)
  92.             else:
  93.                 messages.append(message)
  94.         return messages
  95.  
  96.     def set_pending_flags(self, flags):
  97.         for filename in self._walk_pending_messages():
  98.             self._set_flags(filename, flags)
  99.             break
  100.  
  101.     def add_pending_flags(self, flags):
  102.         for filename in self._walk_pending_messages():
  103.             self._add_flags(filename, flags)
  104.             break
  105.  
  106.     def delete_old_messages(self):
  107.         """Delete messages which are unlikely to be needed in the future."""
  108.         filenames = self._get_sorted_filenames()
  109.         for fn in itertools.islice(self._walk_messages(exclude=HELD+BROKEN),
  110.                                    self.get_pending_offset()):
  111.             os.unlink(fn)
  112.             containing_dir = posixpath.split(fn)[0]
  113.             if not os.listdir(containing_dir):
  114.                 os.rmdir(containing_dir)
  115.  
  116.     def delete_all_messages(self):
  117.         """Remove ALL stored messages."""
  118.         self.set_pending_offset(0)
  119.         for filename in self._walk_messages():
  120.             os.unlink(filename)
  121.  
  122.     def is_pending(self, message_id):
  123.         """Return bool indicating if C{message_id} still hasn't been delivered.
  124.  
  125.         @param message_id: Identifier returned by the L{add()} method.
  126.         """
  127.         i = 0
  128.         pending_offset = self.get_pending_offset()
  129.         for filename in self._walk_messages(exclude=BROKEN):
  130.             flags = self._get_flags(filename)
  131.             if ((HELD in flags or i >= pending_offset) and
  132.                 os.stat(filename).st_ino == message_id):
  133.                 return True
  134.             if BROKEN not in flags and HELD not in flags:
  135.                 i += 1
  136.         return False
  137.  
  138.     def add(self, message):
  139.         """Queue a message for delivery.
  140.  
  141.         @return: message_id, which is an identifier for the added message.
  142.         """
  143.         assert "type" in message
  144.  
  145.         message_data = bpickle.dumps(message)
  146.  
  147.         filename = self._get_next_message_filename()
  148.  
  149.         file = open(filename + ".tmp", "w")
  150.         file.write(message_data)
  151.         file.close()
  152.         os.rename(filename + ".tmp", filename)
  153.  
  154.         # For now we use the inode as the message id, as it will work
  155.         # correctly even faced with holding/unholding.  It will break
  156.         # if the store is copied over for some reason, but this shouldn't
  157.         # present an issue given the current uses.  In the future we
  158.         # should have a nice transactional storage (e.g. sqlite) which
  159.         # will offer a more strong primary key.
  160.         message_id = os.stat(filename).st_ino
  161.  
  162.         return message_id
  163.  
  164.     def _get_next_message_filename(self):
  165.         message_dirs = self._get_sorted_filenames()
  166.         if message_dirs:
  167.             newest_dir = message_dirs[-1]
  168.         else:
  169.             os.makedirs(self._message_dir("0"))
  170.             newest_dir = "0"
  171.  
  172.         message_filenames = self._get_sorted_filenames(newest_dir)
  173.         if not message_filenames:
  174.             filename = self._message_dir(newest_dir, "0")
  175.         elif len(message_filenames) < self._directory_size:
  176.             filename = str(int(message_filenames[-1].split("_")[0]) + 1)
  177.             filename = self._message_dir(newest_dir, filename)
  178.         else:
  179.             newest_dir = self._message_dir(str(int(newest_dir) + 1))
  180.             os.makedirs(newest_dir)
  181.             filename = posixpath.join(newest_dir, "0")
  182.  
  183.         return filename
  184.  
  185.     def _walk_pending_messages(self):
  186.         """Walk the files which are definitely pending."""
  187.         pending_offset = self.get_pending_offset()
  188.         for i, filename in enumerate(self._walk_messages(exclude=HELD+BROKEN)):
  189.             if i >= pending_offset:
  190.                 yield filename
  191.  
  192.     def _walk_messages(self, exclude=None):
  193.         if exclude:
  194.             exclude = set(exclude)
  195.         message_dirs = self._get_sorted_filenames()
  196.         for message_dir in message_dirs:
  197.             for filename in self._get_sorted_filenames(message_dir):
  198.                 flags = set(self._get_flags(filename))
  199.                 if (not exclude or not exclude & flags):
  200.                     yield self._message_dir(message_dir, filename)
  201.  
  202.     def _get_sorted_filenames(self, dir=""):
  203.         message_files = [x for x in os.listdir(self._message_dir(dir))
  204.                          if not x.endswith(".tmp")]
  205.         message_files = sorted(message_files,
  206.             key=lambda x: int(x.split("_")[0]))
  207.         return message_files
  208.  
  209.     def _message_dir(self, *args):
  210.         return posixpath.join(self._directory, *args)
  211.  
  212.     def _get_content(self, filename):
  213.         file = open(filename)
  214.         try:
  215.             return file.read()
  216.         finally:
  217.             file.close()
  218.  
  219.     def _get_flags(self, path):
  220.         basename = posixpath.basename(path)
  221.         if "_" in basename:
  222.             return basename.split("_")[1]
  223.         return ""
  224.  
  225.     def _set_flags(self, path, flags):
  226.         dirname, basename = posixpath.split(path)
  227.         new_path = posixpath.join(dirname, basename.split("_")[0])
  228.         if flags:
  229.             new_path += "_"+"".join(sorted(set(flags)))
  230.         os.rename(path, new_path)
  231.         return new_path
  232.  
  233.     def _add_flags(self, path, flags):
  234.         self._set_flags(path, self._get_flags(path)+flags)
  235.  
  236.  
  237. def got_next_sequence(message_store, next_sequence):
  238.     """Our peer has told us what it expects our next message's sequence to be.
  239.  
  240.     Call this with the message store and sequence number that the peer
  241.     wants next; this will do various things based on what *this* side
  242.     has in its outbound queue store.
  243.  
  244.     1. The peer expects a sequence greater than what we last
  245.        sent. This is the common case and generally it should be
  246.        expecting last_sent_sequence+len(messages_sent)+1.
  247.  
  248.     2. The peer expects a sequence number our side has already sent,
  249.        and we no longer have that message. In this case, just send
  250.        *all* messages we have, including the previous generation,
  251.        starting at the sequence number the peer expects (meaning that
  252.        messages have probably been lost).
  253.  
  254.     3. The peer expects a sequence number we already sent, and we
  255.        still have that message cached. In this case, we send starting
  256.        from that message.
  257.  
  258.     If the next sequence from the server refers to a message older than
  259.     we have, then L{ANCIENT} will be returned.
  260.     """
  261.     ret = None
  262.     old_sequence = message_store.get_sequence()
  263.     if next_sequence > old_sequence:
  264.         message_store.delete_old_messages()
  265.         pending_offset = next_sequence - old_sequence
  266.     elif next_sequence < (old_sequence - message_store.get_pending_offset()):
  267.         # "Ancient": The other side wants messages we don't have,
  268.         # so let's just reset our counter to what it expects.
  269.         pending_offset = 0
  270.         ret = ANCIENT
  271.     else:
  272.         # No messages transferred, or
  273.         # "Old": We'll try to send these old messages that the
  274.         # other side still wants.
  275.         pending_offset = (message_store.get_pending_offset() + next_sequence
  276.                           - old_sequence)
  277.  
  278.     message_store.set_pending_offset(pending_offset)
  279.     message_store.set_sequence(next_sequence)
  280.     return ret
  281.